In this notebook, a template is provided for you to implement your functionality in stages, which is required to successfully complete this project. If additional code is required that cannot be included in the notebook, be sure that the Python code is successfully imported and included in your submission if necessary.
Note: Once you have completed all of the code implementations, you need to finalize your work by exporting the iPython Notebook as an HTML document. Before exporting the notebook to html, all of the code cells need to have been run so that reviewers can see the final implementation and output. You can then export the notebook by using the menu above and navigating to \n", "File -> Download as -> HTML (.html). Include the finished document along with this notebook as your submission.
In addition to implementing code, there is a writeup to complete. The writeup should be completed in a separate file, which can be either a markdown file or a pdf document. There is a write up template that can be used to guide the writing process. Completing the code template and writeup template will cover all of the rubric points for this project.
The rubric contains "Stand Out Suggestions" for enhancing the project beyond the minimum requirements. The stand out suggestions are optional. If you decide to pursue the "stand out suggestions", you can include the code in this Ipython notebook and also discuss the results in the writeup file.
Note: Code and Markdown cells can be executed using the Shift + Enter keyboard shortcut. In addition, Markdown cells can be edited by typically double-clicking the cell to enter edit mode.
import logging
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.INFO)
logger = logging.getLogger("trafficsigns")
import pickle
import os
import sys
import time
import datetime
import math
import random
import numpy as np
import pandas as pd
import tensorflow as tf
import sklearn
import sklearn.utils
import cv2
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt
print("# Versions")
for module in (np, pd, matplotlib, tf, sklearn, cv2):
print("{:s}=={:8s}".format(module.__package__, module.__version__))
tf_version_required = '1.0.0'
assert tf.__version__ == tf_version_required, "This code requirements tensorflow=={}".format(tf_version_required)
def grayscale(x, keepdims=True):
"""Convert an array of RGB images to grayscale."""
if x.shape[-1] == 1:
# Assume is already grayscale
return x
y = .299 * x[..., 0] + .587 * x[..., 1] + .114 * x[..., 2]
if x.dtype == np.uint8:
# saturate cast to bytes
y = np.clip(y, 0, 255).astype(np.uint8)
if keepdims and x.ndim == 4:
y = y[..., np.newaxis]
return y
def showgrid(images, predictions=None, rows=None, cols=None, **kwargs):
"""Display a grid of images.
Args:
images: a 4D array
predictions: an optional list of labels or list of pairs (label, prediction == ground_truth)
rows: force the number of rows in the grid
cols: force the number of columns
kwargs: keyword arguments pass to `matplotlib.subplots`.
"""
import matplotlib.patches as patches
n = len(images)
if rows:
cols = math.ceil(n / rows)
else:
if not cols:
cols = min(12, math.ceil(math.sqrt(n)))
rows = math.ceil(n / cols)
if images.dtype != np.uint8:
images = grayscale(images)
fig, axes = plt.subplots(rows, cols, figsize=(cols, rows), sharey=True, sharex=True)
fig.subplots_adjust(left=0, right=1, bottom=0, top=1, hspace=0.05, wspace=0.05)
if not hasattr(axes, 'flat'):
axes = np.asarray([axes])
for ax in axes.flat: ax.axis('off')
if predictions is None:
for ax, im in zip(axes.flat, images):
ax.imshow(np.squeeze(im), **kwargs)
ax.axis('on')
ax.tick_params(axis='both', left='off', top='off', right='off', bottom='off', labelleft='off', labeltop='off', labelright='off', labelbottom='off')
else:
for ax, im, prediction_ in zip(axes.flat, images, predictions):
ax.imshow(np.squeeze(im), *kwargs)
ax.axis('on')
ax.tick_params(axis='both', left='off', top='off', right='off', bottom='off', labelleft='off', labeltop='off', labelright='off', labelbottom='off')
if type(prediction_) == tuple:
prediction, ok = prediction_
else:
prediction, ok = prediction_, None
color = 'y' if ok is None else 'g' if ok else 'r'
r = patches.Rectangle((0.2, 0.2), 8, 8, color=color, alpha=.8)
ax.add_patch(r)
rx, ry = r.get_xy()
cx = rx + r.get_width()/2.0
cy = ry + r.get_height()/2.0
ax.annotate(str(prediction), (cx, cy), color='w', weight='bold',
fontsize=12, ha='center', va='center')
return fig
training_file = 'data/train.p'
validation_file = 'data/valid.p'
testing_file = 'data/test.p'
with open(training_file, mode='rb') as f:
train = pickle.load(f)
with open(validation_file, mode='rb') as f:
valid = pickle.load(f)
with open(testing_file, mode='rb') as f:
test = pickle.load(f)
X_train, y_train = train['features'], train['labels']
X_valid, y_valid = valid['features'], valid['labels']
X_test, y_test = test['features'], test['labels']
# Deleted unneeded variables
del train, valid, test
The pickled data is a dictionary with 4 key/value pairs:
'features' is a 4D array containing raw pixel data of the traffic sign images, (num examples, width, height, channels).'labels' is a 1D array containing the label/class id of the traffic sign. The file signnames.csv contains id -> name mappings for each id.'sizes' is a list containing tuples, (width, height) representing the the original width and height the image.'coords' is a list containing tuples, (x1, y1, x2, y2) representing coordinates of a bounding box around the sign in the image. THESE COORDINATES ASSUME THE ORIGINAL IMAGE. THE PICKLED DATA CONTAINS RESIZED VERSIONS (32 by 32) OF THESE IMAGESComplete the basic data summary below. Use python, numpy and/or pandas methods to calculate the data summary rather than hard coding the results. For example, the pandas shape method might be useful for calculating some of the summary results.
# TODO: Number of training examples
n_train = len(X_train)
# Number of validation examples
n_valid = len(X_valid)
# TODO: Number of testing examples.
n_test = len(X_test)
# TODO: What's the shape of an traffic sign image?
image_shape = X_train.shape[1:]
# TODO: How many unique classes/labels there are in the dataset.
n_classes = len(np.unique(y_train))
print("Number of training examples =", n_train)
print("Number of validation examples =", n_valid)
print("Number of testing examples =", n_test)
print("Image data shape =", image_shape)
print("Number of classes =", n_classes)
Visualize the German Traffic Signs Dataset using the pickled file(s). This is open ended, suggestions include: plotting traffic sign images, plotting the count of each sign, etc.
The Matplotlib examples and gallery pages are a great resource for doing visualizations in Python.
NOTE: It's recommended you start with something simple first. If you wish to do more, come back to it after you've completed the rest of the sections.
SIGNNAMES = pd.read_csv("signnames.csv", index_col=0).sort_index().SignName
def display_class_frequencies(labels_and_legend, normalized=True):
"""Display class frequencies."""
dataset = {}
for y, name in labels_and_legend:
if normalized:
n = len(y)
else:
n = 1
dataset[name] = [(y == label).sum() / n for label in range(n_classes)]
dataset = pd.DataFrame(dataset)
fig, ax = plt.subplots(1, 1, figsize=(20,5), )
ax.set_title('class frequency')
dataset.plot.bar(ax=ax)
ax.grid(axis='y', alpha=.5)
if not normalized:
ylim = ax.get_ylim()[1]
if ylim > 4000:
step = 400
else:
step = 200
ax.set_yticks(np.arange(0, ylim, step))
display_class_frequencies([(y_train, 'train'), (y_valid, 'valid'), (y_test, 'test')], normalized=False)
The dataset is very unbalanced but this can represent the real frequency of the data.
def display_random_sample(images, labels, samples_per_class, classes=None, dpi=96, **kwargs):
"""Display a random sample of images."""
if classes is None:
classes = sorted(np.unique(labels))
n_classes = len(classes)
# Handel grayscale images
images = np.squeeze(images)
if images.dtype != np.uint8 and images.shape[-1] == 3:
# Convert back to bytes (assuming preprocess)
p = lambda x: np.clip(x * 128 + 128, 0, 255).astype(np.uint8)
else:
p = lambda x: x
fig, axes = plt.subplots(n_classes, 1, figsize=(samples_per_class*1.5, n_classes*2), dpi=dpi, **kwargs)
for k, label in enumerate(classes):
labels_mask = labels == label
n_label_samples = labels_mask.sum()
# pick a random sample matching current label
samples = images[np.random.choice(np.where(labels_mask)[0], samples_per_class, replace=False)]
sample_image = np.hstack(samples)
ax = axes[k]
ax.imshow(p(sample_image), cmap='gray')
ax.axis('off')
ax.text(0, -5, "{:2d}. {:s} ({:d} samples)".format(label, SIGNNAMES[label], n_label_samples), ha='left', va='top', fontsize=10)
Display a random sample of 10 images per class in the train dataset.
display_random_sample(X_train, y_train, 10, dpi=72)
Design and implement a deep learning model that learns to recognize traffic signs. Train and test your model on the German Traffic Sign Dataset.
There are various aspects to consider when thinking about this problem:
Here is an example of a published baseline model on this problem. It's not required to be familiar with the approach used in the paper but, it's good practice to try to read papers like these.
NOTE: The LeNet-5 implementation shown in the classroom at the end of the CNN lesson is a solid starting point. You'll have to change the number of classes and possibly the preprocessing, but aside from that it's plug and play!
There are notable variations in the illumination conditions of the images.
Use the code cell (or multiple code cells, if necessary) to implement the first step of your project.
def equalize(im, clahe=None):
if im.shape[-1] == 3:
lab = cv2.cvtColor(im, cv2.COLOR_RGB2LAB)
if clahe is None:
lab[...,0] = cv2.equalizeHist(lab[...,0])
else:
lab[...,0] = clahe.apply(lab[...,0])
return cv2.cvtColor(lab, cv2.COLOR_LAB2RGB)
elif im.shape[-1] == 1:
if clahe is None:
dst = cv2.equalizeHist(im[..., 0])
else:
dst = clahe.apply(im[..., 0])
return dst[..., np.newaxis]
else:
raise ValueError("unsuported shape {}".format(im.shape))
def preprocess(source):
"""Preprocess RGB images.
Parameters
----------
source : RGB image or array of RGB images
"""
if source.ndim == 3:
# Support just one RGB image adding an extra dimension
source = source[np.newaxis, ...]
# Convert to grayscale
x = grayscale(source)
# Or keep RGB
# x = source.copy()
# Local histogram equalization (CLAHE)
clahe = cv2.createCLAHE(clipLimit=10, tileGridSize=(2,2))
for im in x:
im[:] = equalize(im, clahe)
# scale to [-1, 1] and convert to float32
return np.float32((x - 128.0) / 128.0)
training_dataset = preprocess(X_train), y_train
validation_dataset = preprocess(X_valid), y_valid
testing_dataset = preprocess(X_test), y_test
display_random_sample(training_dataset[0], y_train, 10, classes=(2,20,30,40), dpi=72)
Save the preprocessed data on disk.
PREPROCESSED_DATA_DIR = "data/preprocessed3"
os.makedirs(PREPROCESSED_DATA_DIR, exist_ok=True)
datasets = (('train', training_dataset),
('valid', validation_dataset),
('test', testing_dataset))
for name, dataset in datasets:
with open("{}/{}.p".format(PREPROCESSED_DATA_DIR, name), "wb") as f:
pickle.dump(dataset, f)
def build_graph(model_architecture, params):
is_training = tf.placeholder_with_default(tf.constant(False), None, name='is_training')
x = tf.placeholder(tf.float32, (None,)+params['image_shape'], name='x')
y = tf.placeholder(tf.int32, (None,), name='y')
one_hot_y = tf.one_hot(y, params['n_classes'])
endpoints = model_architecture(x, params, is_training)
assert type(endpoints) == dict
logits = endpoints['logits']
# Loss
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=one_hot_y)
reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
regularization_loss = tf.add_n(reg_losses)
if reg_losses:
logger.info("Adding regularization to the loss operation")
loss = tf.add(tf.reduce_mean(cross_entropy), regularization_loss, name='loss')
# Evaluation
correct_prediction = tf.equal(tf.argmax(logits, 1), tf.argmax(one_hot_y, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32), name='accuracy')
return (x, y), logits, accuracy, loss, endpoints
# Adapted to TensorFlow 1.0 the spatial transformer from
# https://github.com/tensorflow/models/tree/master/transformer
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
#import tensorflow as tf
def transformer(U, theta, out_size, name='SpatialTransformer', **kwargs):
"""Spatial Transformer Layer
Implements a spatial transformer layer as described in [1]_.
Based on [2]_ and edited by David Dao for Tensorflow.
Parameters
----------
U : float
The output of a convolutional net should have the
shape [num_batch, height, width, num_channels].
theta: float
The output of the
localisation network should be [num_batch, 6].
out_size: tuple of two ints
The size of the output of the network (height, width)
References
----------
.. [1] Spatial Transformer Networks
Max Jaderberg, Karen Simonyan, Andrew Zisserman, Koray Kavukcuoglu
Submitted on 5 Jun 2015
.. [2] https://github.com/skaae/transformer_network/blob/master/transformerlayer.py
Notes
-----
To initialize the network to the identity transform init
``theta`` to :
identity = np.array([[1., 0., 0.],
[0., 1., 0.]])
identity = identity.flatten()
theta = tf.Variable(initial_value=identity)
"""
def _repeat(x, n_repeats):
with tf.variable_scope('_repeat'):
rep = tf.transpose(
tf.expand_dims(tf.ones(shape=(n_repeats,)), 1), [1, 0])
rep = tf.cast(rep, 'int32')
x = tf.matmul(tf.reshape(x, (-1, 1)), rep)
return tf.reshape(x, [-1])
def _interpolate(im, x, y, out_size):
with tf.variable_scope('_interpolate'):
# constants
num_batch = tf.shape(im)[0]
height = tf.shape(im)[1]
width = tf.shape(im)[2]
channels = tf.shape(im)[3]
x = tf.cast(x, 'float32')
y = tf.cast(y, 'float32')
height_f = tf.cast(height, 'float32')
width_f = tf.cast(width, 'float32')
out_height = out_size[0]
out_width = out_size[1]
zero = tf.zeros([], dtype='int32')
max_y = tf.cast(height - 1, 'int32')
max_x = tf.cast(width - 1, 'int32')
# scale indices from [-1, 1] to [0, width/height]
x = (x + 1.0)*(width_f) / 2.0
y = (y + 1.0)*(height_f) / 2.0
# do sampling
x0 = tf.cast(tf.floor(x), 'int32')
x1 = x0 + 1
y0 = tf.cast(tf.floor(y), 'int32')
y1 = y0 + 1
x0 = tf.clip_by_value(x0, zero, max_x)
x1 = tf.clip_by_value(x1, zero, max_x)
y0 = tf.clip_by_value(y0, zero, max_y)
y1 = tf.clip_by_value(y1, zero, max_y)
dim2 = width
dim1 = width*height
base = _repeat(tf.range(num_batch)*dim1, out_height*out_width)
base_y0 = base + y0*dim2
base_y1 = base + y1*dim2
idx_a = base_y0 + x0
idx_b = base_y1 + x0
idx_c = base_y0 + x1
idx_d = base_y1 + x1
# use indices to lookup pixels in the flat image and restore
# channels dim
im_flat = tf.reshape(im, tf.stack([-1, channels]))
im_flat = tf.cast(im_flat, 'float32')
Ia = tf.gather(im_flat, idx_a)
Ib = tf.gather(im_flat, idx_b)
Ic = tf.gather(im_flat, idx_c)
Id = tf.gather(im_flat, idx_d)
# and finally calculate interpolated values
x0_f = tf.cast(x0, 'float32')
x1_f = tf.cast(x1, 'float32')
y0_f = tf.cast(y0, 'float32')
y1_f = tf.cast(y1, 'float32')
wa = tf.expand_dims(((x1_f-x) * (y1_f-y)), 1)
wb = tf.expand_dims(((x1_f-x) * (y-y0_f)), 1)
wc = tf.expand_dims(((x-x0_f) * (y1_f-y)), 1)
wd = tf.expand_dims(((x-x0_f) * (y-y0_f)), 1)
output = tf.add_n([wa*Ia, wb*Ib, wc*Ic, wd*Id])
return output
def _meshgrid(height, width):
with tf.variable_scope('_meshgrid'):
# This should be equivalent to:
# x_t, y_t = np.meshgrid(np.linspace(-1, 1, width),
# np.linspace(-1, 1, height))
# ones = np.ones(np.prod(x_t.shape))
# grid = np.vstack([x_t.flatten(), y_t.flatten(), ones])
x_t = tf.matmul(tf.ones(shape=tf.stack([height, 1])),
tf.transpose(tf.expand_dims(tf.linspace(-1.0, 1.0, width), 1), [1, 0]))
y_t = tf.matmul(tf.expand_dims(tf.linspace(-1.0, 1.0, height), 1),
tf.ones(shape=tf.stack([1, width])))
x_t_flat = tf.reshape(x_t, (1, -1))
y_t_flat = tf.reshape(y_t, (1, -1))
ones = tf.ones_like(x_t_flat)
grid = tf.concat([x_t_flat, y_t_flat, ones], axis=0)
return grid
def _transform(theta, input_dim, out_size):
with tf.variable_scope('_transform'):
num_batch = tf.shape(input_dim)[0]
height, width, num_channels = input_dim.get_shape().as_list()[1:]
theta = tf.reshape(theta, (-1, 2, 3))
theta = tf.cast(theta, 'float32')
# grid of (x_t, y_t, 1), eq (1) in ref [1]
height_f = tf.cast(height, 'float32')
width_f = tf.cast(width, 'float32')
out_height = out_size[0]
out_width = out_size[1]
grid = _meshgrid(out_height, out_width)
grid = tf.expand_dims(grid, 0)
grid = tf.reshape(grid, [-1])
grid = tf.tile(grid, tf.stack([num_batch]))
grid = tf.reshape(grid, tf.stack([num_batch, 3, -1]))
# Transform A x (x_t, y_t, 1)^T -> (x_s, y_s)
T_g = tf.matmul(theta, grid)
x_s = tf.slice(T_g, [0, 0, 0], [-1, 1, -1])
y_s = tf.slice(T_g, [0, 1, 0], [-1, 1, -1])
x_s_flat = tf.reshape(x_s, [-1])
y_s_flat = tf.reshape(y_s, [-1])
input_transformed = _interpolate(
input_dim, x_s_flat, y_s_flat,
out_size)
output = tf.reshape(
input_transformed, tf.stack([num_batch, out_height, out_width, num_channels]))
return output
with tf.variable_scope(name):
output = _transform(theta, U, out_size)
return output
import tensorflow.contrib.slim as slim
def spatial_transformer_layer(inputs, downsample, is_training=False, scope=None):
image_shape = inputs.get_shape().as_list()[1:]
image_size = image_shape[0] * image_shape[1] * image_shape[2]
output_size = (image_shape[0]//downsample, image_shape[1]//downsample)
x = tf.reshape(inputs, (-1, image_size))
with tf.variable_scope(scope):
fc = slim.fully_connected(x, 6, scope='loc',
activation_fn=tf.nn.relu,
weights_initializer=tf.zeros_initializer(),
biases_initializer=tf.constant_initializer([1., 0, 0, 0, 1., 0]))
return transformer(inputs, fc, output_size)
def test_spatial_transformer():
tf.reset_default_graph()
x = tf.placeholder(tf.float32, (None,32,32,3))
layer = spatial_transformer_layer(x, 2, scope='transformer')
with tf.Session() as session:
session.run(tf.global_variables_initializer())
y = session.run(layer, feed_dict={x: np.ones((3,32,32,3), dtype=np.uint8)})
assert y.shape == (3, 16, 16, 3)
tf.reset_default_graph()
test_spatial_transformer()
def get_model_architecture(name=None):
import tensorflow.contrib.slim as slim
def architecture(inputs, params, is_training):
endpoints = {}
net = inputs
net = slim.dropout(net, .9, is_training=is_training, scope='dropout0')
with slim.arg_scope([slim.conv2d, slim.max_pool2d], padding='SAME'):
net = spatial_transformer_layer(net, downsample=1, is_training=is_training, scope='transformer1')
endpoints['transformer1'] = net
with slim.arg_scope([slim.conv2d, slim.fully_connected],
activation_fn=None,
weights_initializer=slim.xavier_initializer(),
weights_regularizer=slim.l2_regularizer(params.get('l2_regularizer', 0.005)),
normalizer_fn=slim.batch_norm,
normalizer_params=dict(decay=0.9, activation_fn=tf.nn.relu, updates_collections=None, is_training=is_training, scope='bn')):
net = slim.repeat(net, 3, slim.conv2d, 128, 3, scope='conv1')
endpoints['conv1'] = net
net = slim.max_pool2d(net, (2,2), scope='pool1')
net = slim.dropout(net, .9, is_training=is_training, scope='dropout1')
net = slim.repeat(net, 3, slim.conv2d, 256, 5, scope='conv2')
endpoints['conv2'] = net
net = slim.max_pool2d(net, (2,2), scope='pool2')
net = slim.dropout(net, .7, is_training=is_training, scope='dropout1')
net = slim.repeat(net, 3, slim.conv2d, 512, 3, scope='conv3')
endpoints['conv3'] = net
net = slim.max_pool2d(net, (2,2), scope='pool3')
net = slim.dropout(net, .5, is_training=is_training, scope='dropout1')
net = slim.flatten(net)
net = slim.fully_connected(net, 1024, scope='fc1')
endpoints['fc1'] = net
net = slim.fully_connected(net, 512, scope='fc2')
endpoints['fc2'] = net
net = slim.dropout(net, .5, is_training=is_training, scope='dropout2')
net = slim.fully_connected(net, params['n_classes'], activation_fn=None, scope='logits')
endpoints['logits'] = net
return endpoints
return architecture
# Test that we can build the model
def test_model_construction():
try:
model = get_model_architecture()
logger.info(model.__name__)
tf.reset_default_graph()
graph = tf.Graph()
with graph.as_default():
build_graph(model, {'n_classes':43, 'batch_size':128, 'image_shape': (32,32,3)})
with tf.Session(graph=graph) as session:
session.run(tf.global_variables_initializer())
logger.info("Available weights")
variables = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
for var in variables:
logger.info("- {} {}".format(var.name, var.get_shape()))
except Exception as e:
logger.info(e)
raise e
test_model_construction()
A validation set can be used to assess how well the model is performing. A low accuracy on the training and validation sets imply underfitting. A high accuracy on the training set but low accuracy on the validation set implies overfitting.
# A basic iterator helper. TensorFlow has a more complete support.
class BatchIterator:
def __init__(self, batch_size, shuffle=True):
self.batch_size = batch_size
self.shuffle = shuffle
def __call__(self, x, y):
BATCH_SIZE = self.batch_size
if self.shuffle:
x, y = sklearn.utils.shuffle(x, y)
for offset in range(0, len(x), BATCH_SIZE):
yield(x[offset:offset+BATCH_SIZE], y[offset:offset+BATCH_SIZE])
# Simple early stopping mechanism with patience.
class EarlyStopping:
def __init__(self, saver, restore_path, patience, minimize=True):
self.saver = saver
self.patience = patience
self.minimize = minimize
self.best_value = np.inf if minimize else 0
self.best_epoch = 0
self.restore_path = None
self.checkpoint_path = restore_path + "-early_stopping_checkpoint"
def __del__(self):
dirname = os.path.dirname(self.checkpoint_path)
basename = os.path.basename(self.checkpoint_path)
for filename in os.listdir(dirname):
if filename.startswith(basename):
os.unlink(os.path.join(dirname, filename))
def __call__(self, session, value, epoch):
if (self.minimize and value < self.best_value) or \
(not self.minimize and value > self.best_value):
self.best_value = value
self.best_epoch = epoch
self.restore_path = self.saver.save(session, self.checkpoint_path)
elif self.best_epoch + self.patience < epoch:
# We waited enough, restore and stop
if self.restore_path is not None:
self.saver.restore(session, self.restore_path)
else:
raise RuntimeError("Failed to restore session")
return True
return False
def evaluate(session, batch_size, features, labels):
batch_iterator = BatchIterator(batch_size)
accuracy = loss = 0
for batch_x, batch_y in batch_iterator(features, labels):
acc_, loss_ = session.run(['accuracy:0', 'loss:0'], feed_dict={'x:0': batch_x, 'y:0': batch_y, 'is_training:0': False})
accuracy += (acc_ * len(batch_x))
loss += (loss_ * len(batch_x))
return accuracy / len(features), loss / len(features)
def train_model(model, params, training_data, validation_data, restore=False):
# Ensure the dir exists
os.makedirs(MODEL_PATH, exist_ok=True)
# Where do we save the model
model_path = os.path.join(MODEL_PATH, model.__name__)
train_size = len(training_data[0])
batch_size = params['batch_size']
epochs = params['epochs']
# Reset the session...
if tf.get_default_session():
tf.get_default_session().close()
# Build our graph
train_graph = tf.Graph()
with train_graph.as_default():
logger.info("Building graph...")
(x, y), logits, accuracy_operation, loss_operation, endpoints \
= build_graph(model, params)
# FIXME. Train operation taking into account pending update operations.
# This is mostly for the batch normalization, but it's still not working.
# I need a better understanding of TensorFlow internals. Meanwhile,
# we can use batch normalization disabling the `updates_collection`.
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
if update_ops:
from tensorflow.python.ops import control_flow_ops
with tf.control_dependencies(update_ops):
barrier = control_flow_ops.no_op(name='update_barrier')
loss_operation = control_flow_ops.with_dependencies([barrier], loss_operation)
total_batches = math.ceil(train_size / batch_size) * epochs
# Batch counter for the exponential decay
batch = tf.Variable(0, dtype=tf.int32, name='batch')
# Decay once per epoch, using an exponential schedule.
learning_rate = tf.train.exponential_decay(
params['learning_rate'], # Base learning rate.
batch, # Global step - current index into the dataset.
total_batches, # Decay steps.
params.get('learning_decay', 0.1), # Decay rate.
staircase=False,
name='learning_rate')
training_operation = tf.train.AdamOptimizer(learning_rate).minimize(loss_operation, global_step=batch)
history = []
with tf.Session(graph=train_graph) as session:
logger.info("Initialising...")
session.run(tf.global_variables_initializer())
saver = tf.train.Saver()
if restore:
# continue from a previous session
saver.restore(session, model_path)
logger.info("Model restored from {}".format(model_path))
logger.info("Training {} with {} samples in batches of {}...".format(
model.__name__, len(training_data[0]), batch_size))
batch_shuffle_iterator = BatchIterator(batch_size, shuffle=True)
early_stopping = EarlyStopping(saver, model_path, params['early_stopping_patience'], minimize=True)
try:
time_elapsed_avg = 0
for epoch in range(epochs):
time_start = time.perf_counter()
# Training pass
for batch_x, batch_y in batch_shuffle_iterator(*training_data):
_, lr = session.run([training_operation, learning_rate], feed_dict={x: batch_x, y: batch_y, 'is_training:0': True})
# Evaluate batch performance on the whole data
training_performance = evaluate(session, batch_size, *training_data)
validation_performance = evaluate(session, batch_size, *validation_data)
performance = training_performance + validation_performance
history.append(performance)
# Measure time
time_end = time.perf_counter()
time_elapsed = time_end - time_start
time_elapsed_avg = (time_elapsed_avg * epoch + time_elapsed) / (epoch + 1)
time_remaining = datetime.timedelta(seconds=math.ceil((epochs - epoch - 1) * time_elapsed_avg))
validation_accuracy, validation_loss = validation_performance
# Symbols: 👍🔥👎🔺🔻
if validation_loss < early_stopping.best_value:
progress_symbol = '🔥'
else:
progress_symbol = '👎'
logger.info(" EPOCH {:3d}... Learning Rate = {:.1e} Training and Validation Accuracy = {:.3f}, {:.3f} Loss = {:.3f}, {:.3f} {}; {:.0f} sec ETA {}".format(
epoch+1, lr, *map(performance.__getitem__, (0,2,1,3)), progress_symbol, time_elapsed, time_remaining))
if validation_loss < 1e-4:
logger.info("Early stopping. Validation Loss below threshold.")
break
if early_stopping(session, validation_loss, epoch):
raise StopIteration()
except (KeyboardInterrupt, StopIteration):
logger.info("Early stopping. Best monitored loss was {:.3f} at epoch {}.".format(
early_stopping.best_value, early_stopping.best_epoch+1))
history = history[:early_stopping.best_epoch+1]
history = pd.DataFrame(data=history, columns=('training_accuracy', 'training_loss', 'validation_accuracy', 'validation_loss'))
restore_path = saver.save(session, model_path)
logger.info("Model saved in {}".format(restore_path))
return history, restore_path
def display_training_history(training_history):
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
ax = axes[0]
ax.plot(training_history.validation_loss, label="validation")
ax.plot(training_history.training_loss, label="training")
ax.set_ylabel("Loss")
ax.set_xlabel("Epoch")
ax.set_title("Training Loss")
ax.legend(loc=0)
ax.grid("on")
ax = axes[1]
ax.plot(training_history.validation_accuracy, label="validation")
ax.plot(training_history.training_accuracy, label="training")
ax.set_ylabel("Accuracy")
ax.set_xlabel("Epoch")
ax.set_title("Training Accuracy")
ax.legend(loc=0)
ax.grid("on")
def test_model(model, params, testing_data, restore_path=None):
if restore_path is None:
restore_path = os.path.join(MODEL_PATH, model.__name__)
graph = tf.Graph()
with graph.as_default():
(x, y), logits, accuracy_operation, loss_operation, _ = build_graph(model, params)
with tf.Session(graph=graph) as session:
session.run(tf.global_variables_initializer())
saver = tf.train.Saver()
logger.info("Restoring model from '{}'".format(restore_path))
saver.restore(session, restore_path)
logger.info("Testing...")
testing_accuracy, testing_loss = evaluate(session, params['batch_size'], *testing_data)
logger.info("Testing Accuracy = {:.3f} Loss = {:.3f}".format(testing_accuracy, testing_loss))
def predict(model, params, X_data, restore_path=None):
if restore_path is None:
restore_path = os.path.join(MODEL_PATH, model.__name__)
graph = tf.Graph()
with graph.as_default():
(x, y), logits, accuracy_operation, loss_operation, _ = build_graph(model, params)
with tf.Session(graph=graph) as session:
session.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(session, restore_path)
logger.info("Restored model from {}".format(restore_path))
prediction_operation = tf.arg_max(logits, 1)
if X_data.ndim == 3:
return session.run(prediction_operation, {x: X_data[np.newaxis]})[0]
else:
n_samples = len(X_data)
predictions = np.zeros(n_samples, dtype=np.int8)
for i in range(n_samples):
predictions[i] = session.run(prediction_operation, {x: X_data[i:i+1]})
return predictions
with open(PREPROCESSED_DATA_DIR+"/train.p", "rb") as f:
training_dataset = pickle.load(f)
with open(PREPROCESSED_DATA_DIR+"/valid.p", "rb") as f:
validation_dataset = pickle.load(f)
with open(PREPROCESSED_DATA_DIR+"/test.p", "rb") as f:
testing_dataset = pickle.load(f)
display_random_sample(*training_dataset, 10, classes=[0,20], dpi=72)
Plain lenet gives at least 96% accuracy.
MODEL_PATH = 'models3'
params = dict(
image_shape = training_dataset[0].shape[1:],
n_classes = 43,
learning_rate = 0.001,
learning_decay = .01,
batch_size = 128,
epochs = 200,
l2_regularizer = 0.0005,
early_stopping_patience = 20)
params
model = get_model_architecture()
training_history, restore_path = train_model(model, params, training_dataset, validation_dataset);
display_training_history(training_history)
test_model(model, params, testing_dataset)
To give yourself more insight into how your model is working, download at least five pictures of German traffic signs from the web and use your model to predict the traffic sign type.
You may find signnames.csv useful as it contains mappings from the class id (integer) to the actual sign name.
### Load the images and plot them here.
### Feel free to use as many code cells as needed.
extra_file = 'data/extra.p'
with open(extra_file, mode='rb') as f:
extra = pickle.load(f)
print("Number of extra images:", len(extra[0]))
showgrid(extra[0], extra[1]);
extra_dataset = (preprocess(extra[0]), extra[1])
showgrid(extra_dataset[0], cmap='gray');
### Run the predictions here and use the model to output the prediction for each image.
### Make sure to pre-process the images with the same pre-processing pipeline used earlier.
### Feel free to use as many code cells as needed.
predictions = predict(model, params, extra_dataset[0])
matches = predictions == extra_dataset[1]
showgrid(extra[0], zip(predictions, matches));
### Calculate the accuracy for these 5 new images.
### For example, if the model predicted 1 out of 5 signs correctly, it's 20% accurate on these new images.
# Accuracy over the images that belong to one of the classes in the dataset
accuracy = matches.sum() / (len(predictions) - (extra[1] == 99).sum())
print("Accuracy = {:.1f}%".format(accuracy*100))
def random_class_sample(label):
label_class = X_valid[y_valid == label]
im = label_class[np.random.randint(0, len(label_class))]
lab = cv2.cvtColor(im, cv2.COLOR_RGB2LAB)
lab[..., 0] = cv2.equalizeHist(lab[..., 0])
return cv2.cvtColor(lab, cv2.COLOR_LAB2RGB)
errors = np.vstack([extra[0][~matches], list(map(random_class_sample, predictions[~matches]))])
error_labels = np.concatenate((predictions[~matches], predictions[~matches]))
showgrid(errors, error_labels, rows=2, cols=len(errors)//2)
For each of the new images, print out the model's softmax probabilities to show the certainty of the model's predictions (limit the output to the top 5 probabilities for each image). tf.nn.top_k could prove helpful here.
The example below demonstrates how tf.nn.top_k can be used to find the top k predictions for each image.
tf.nn.top_k will return the values and indices (class ids) of the top k predictions. So if k=3, for each sign, it'll return the 3 largest probabilities (out of a possible 43) and the correspoding class ids.
Take this numpy array as an example. The values in the array represent predictions. The array contains softmax probabilities for five candidate images with six possible classes. tk.nn.top_k is used to choose the three classes with the highest probability:
# (5, 6) array
a = np.array([[ 0.24879643, 0.07032244, 0.12641572, 0.34763842, 0.07893497,
0.12789202],
[ 0.28086119, 0.27569815, 0.08594638, 0.0178669 , 0.18063401,
0.15899337],
[ 0.26076848, 0.23664738, 0.08020603, 0.07001922, 0.1134371 ,
0.23892179],
[ 0.11943333, 0.29198961, 0.02605103, 0.26234032, 0.1351348 ,
0.16505091],
[ 0.09561176, 0.34396535, 0.0643941 , 0.16240774, 0.24206137,
0.09155967]])
Running it through sess.run(tf.nn.top_k(tf.constant(a), k=3)) produces:
TopKV2(values=array([[ 0.34763842, 0.24879643, 0.12789202],
[ 0.28086119, 0.27569815, 0.18063401],
[ 0.26076848, 0.23892179, 0.23664738],
[ 0.29198961, 0.26234032, 0.16505091],
[ 0.34396535, 0.24206137, 0.16240774]]), indices=array([[3, 0, 5],
[0, 1, 4],
[0, 5, 1],
[1, 3, 5],
[1, 4, 3]], dtype=int32))
Looking just at the first row we get [ 0.34763842, 0.24879643, 0.12789202], you can confirm these are the 3 largest probabilities in a. You'll also notice [3, 0, 5] are the corresponding indices.
### Print out the top five softmax probabilities for the predictions on the German traffic sign images found on the web.
### Feel free to use as many code cells as needed.
def predict_probabilities(model, params, X_data, top_k, restore_path=None):
if restore_path is None:
restore_path = os.path.join(MODEL_PATH, model.__name__)
graph = tf.Graph()
with graph.as_default():
(x, y), logits, accuracy_operation, loss_operation, endpoints = build_graph(model, params)
with tf.Session(graph=graph) as session:
session.run(tf.global_variables_initializer())
saver = tf.train.Saver()
saver.restore(session, restore_path)
logger.info("Restored model from {}".format(restore_path))
# top k softmax probabilities
probabilities_operation = tf.nn.top_k(tf.nn.softmax(logits), k=top_k)
if X_data.ndim == 3:
return session.run(prediction_operation, {x: X_data[np.newaxis]})[0]
else:
n_samples = len(X_data)
probabilities = np.zeros((n_samples, top_k), dtype=np.float32)
classes = np.zeros((n_samples, top_k), dtype=np.int8)
for i in range(n_samples):
top_kv = session.run(probabilities_operation, {x: X_data[i:i+1]})
probabilities[i] = top_kv.values
classes[i] = top_kv.indices
return probabilities, classes
probabilities, classes = predict_probabilities(model, params, extra_dataset[0], top_k=5)
n_samples = len(probabilities)
fig, axes = plt.subplots(n_samples, 4, figsize=(11, 2*n_samples))
for image, p, labels, ok, ax in zip(extra[0], probabilities, classes, matches, axes):
# reverse the order, lowest to highest to satisfy `barh`
p = p[::-1]
labels = labels[::-1]
ax[0].imshow(image)
ax[0].tick_params(axis='both', left='off', top='off', right='off', bottom='off', labelleft='off', labeltop='off', labelright='off', labelbottom='off')
ax[0].set_title("Input")
color = 'g' if ok else 'r'
r = matplotlib.patches.Rectangle((0.2, 0.2), 8, 8, color=color, alpha=.8)
ax[0].add_patch(r)
rx, ry = r.get_xy()
cx = rx + r.get_width()/2.0
cy = ry + r.get_height()/2.0
ax[0].annotate(str(labels[-1]), (cx, cy), color='w', weight='bold',
fontsize=12, ha='center', va='center')
bottom = np.arange(5) + .5
ax[1].barh(bottom, p)
ax[1].tick_params(axis='both', bottom='off', labelbottom='off')
plt.sca(ax[1])
plt.yticks(bottom, labels)
ax[1].margins(x=.5)
for rect, p_ in zip(ax[1].patches, p):
if p_ < 0.01: continue
ax[1].text(rect.get_x() + rect.get_width() + .01,
rect.get_y() + .1,
"{:.1f}%".format(p_ * 100),
ha='left', va='bottom', weight='bold')
ax[1].set_ylabel("Class")
ax[2].imshow(random_class_sample(labels[-1]))
ax[2].tick_params(axis='both', left='off', top='off', right='off', bottom='off', labelleft='off', labeltop='off', labelright='off', labelbottom='off')
ax[2].set_title("Predicted like")
if p[-2] > .4:
ax[3].imshow(random_class_sample(labels[-2]))
ax[3].tick_params(axis='both', left='off', top='off', right='off', bottom='off', labelleft='off', labeltop='off', labelright='off', labelbottom='off')
ax[3].set_title("Or ...")
else:
ax[3].axis('off')
This Section is not required to complete but acts as an additional excersise for understaning the output of a neural network's weights. While neural networks can be a great learning device they are often referred to as a black box. We can understand what the weights of a neural network look like better by plotting their feature maps. After successfully training your neural network you can see what it's feature maps look like by plotting the output of the network's weight layers in response to a test stimuli image. From these plotted feature maps, it's possible to see what characteristics of an image the network finds interesting. For a sign, maybe the inner network feature maps react with high activation to the sign's boundary outline or to the contrast in the sign's painted symbol.
Provided for you below is the function code that allows you to get the visualization output of any tensorflow weight layer you want. The inputs to the function should be a stimuli image, one used during training or a new one you provided, and then the tensorflow variable name that represents the layer's state during the training process, for instance if you wanted to see what the LeNet lab's feature maps looked like for it's second convolutional layer you could enter conv2 as the tf_activation variable.
For an example of what feature map outputs look like, check out NVIDIA's results in their paper End-to-End Deep Learning for Self-Driving Cars in the section Visualization of internal CNN State. NVIDIA was able to show that their network's inner weights had high activations to road boundary lines by comparing feature maps from an image with a clear path to one without. Try experimenting with a similar test to show that your trained network's weights are looking for interesting features, whether it's looking at differences in feature maps from images with or without a sign, or even what feature maps look like in a trained network vs a completely untrained one on the same sign image.
Your output should look something like this (above)
### Visualize your network's feature maps here.
### Feel free to use as many code cells as needed.
# image_input: the test image being fed into the network to produce the feature maps
# tf_activation: should be a tf variable name used during your training procedure that represents the calculated state of a specific weight layer
# activation_min/max: can be used to view the activation contrast in more detail, by default matplot sets min and max to the actual min and max values of the output
# plt_num: used to plot out multiple different weight feature map sets on the same block, just extend the plt number for each new feature map entry
def outputFeatureMap(session, image_input, tf_activation, activation_min=None, activation_max=None, plt_num=1):
# Here make sure to preprocess your image_input in a way your network expects
# with size, normalization, ect if needed
if image_input.ndim == 3:
image_input = image_input[np.newaxis]
image_input = preprocess(image_input)
# Note: x should be the same name as your network's tensorflow data placeholder variable
# If you get an error tf_activation is not defined it maybe having trouble accessing the variable from inside a function
#activation = tf_activation.eval(session, feed_dict={'x:0':image_input})
# TensorFlow 1.0
activation = session.run(tf_activation, {'x:0':image_input})
featuremaps = activation.shape[3]
rows, cols = math.ceil(featuremaps//12), 12
im_shape = activation.shape[1], activation.shape[2]
images = np.transpose(activation, axes=(3,1,2,0))
if activation_min is None:
activation_min = np.percentile(activation, 2)
if activation_max is None:
activation_max = np.percentile(activation, 99)
return showgrid(images, rows=rows, cols=cols, vmin=activation_min, vmax=activation_max, interpolation='nearest', cmap='gray')
graph = tf.Graph()
with graph.as_default():
(x, y), logits, accuracy_operation, loss_operation, endpoints = build_graph(model, params)
with tf.Session(graph=graph) as session:
session.run(tf.global_variables_initializer())
restore_path = os.path.join(MODEL_PATH, model.__name__)
saver = tf.train.Saver()
saver.restore(session, restore_path)
logger.info("Restored model from {}".format(restore_path))
logger.info("Endpoints: {}".format(", ".join(endpoints.keys())))
for label in (2,20):
label_class = y_train == label
image = X_train[label_class][np.random.randint(0, label_class.sum()-1)]
image_p = grayscale(preprocess(image))[0, ..., 0]
text = "Stimulus {} - {}".format(label, SIGNNAMES[label])
print(text, "="*len(text), sep="\n", flush=True)
fig, ax = plt.subplots(1,2,figsize=(2,1))
ax[0].imshow(image)
ax[1].imshow(image_p, cmap='gray')
ax[0].axis('off')
ax[1].axis('off')
plt.show()
for layer in endpoints:
if not (layer.startswith("conv") or layer.startswith("transformer")):
continue
tf_activation = endpoints[layer]
print("Layer {}".format(layer), "-"*80, sep="\n", flush=True)
fig = outputFeatureMap(session, image, tf_activation)
plt.show()
Discuss how you used the visual output of your trained network's feature maps to show that it had learned to look for interesting characteristics in traffic sign images
Answer:
Note: Once you have completed all of the code implementations and successfully answered each question above, you may finalize your work by exporting the iPython Notebook as an HTML document. You can do this by using the menu above and navigating to \n", "File -> Download as -> HTML (.html). Include the finished document along with this notebook as your submission.